package com.uxsino.reactorq.clientserver;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.uxsino.reactorq.model.IndicatorValue;
import com.uxsino.reactorq.processor.QueueProcessor;

import reactor.core.Disposable;

public class QueryHandle {
    private static Logger logger = LoggerFactory.getLogger(QueryHandle.class);

    private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

    private String queryId;

    // private String sequenceId;//替换为 queryId, 保持和queryId 相同。

    private ScheduledFuture<?> timeoutFuture;

    private Disposable valueSubscription;

    @SuppressWarnings("unused")
    // private Flux<IndicatorValue> flux;
    private QueueProcessor<IndicatorValue> processor;

    private List<IndicatorValue> buffer = new ArrayList<>();

    private Map<String, List<String>> failedIndicators = new HashMap<>();

    // callback consumers
    private Consumer<List<IndicatorValue>> successHandler = null;

    private Consumer<QueryHandle> timeoutHandler = null;

    private Consumer<QueryHandle> errorHandler = null;

    private Consumer<IndicatorValue> itemHandler = null;

    private Consumer<Map<String, List<String>>> failedIndicatorsHandler = null;

    private Consumer<QueryHandle> completeHandler = null;

    private AtomicBoolean running = new AtomicBoolean(false);

    private Consumer<QueryHandle> onStart;

    public QueryHandle(QueueProcessor<IndicatorValue> processor, String queryId, Consumer<QueryHandle> onStart) {
        this.processor = processor;
        // ConnectableFlux<IndicatorValue> vflux = flux.publish();
        // valueSubscription = vflux.subscribe(this::onMessage);
        valueSubscription = this.processor.filter(itm -> this.queryId.equals(itm.sequenceId))
            .subscribe(this::onMessage);
        // vflux.subscribe();
        // vflux.connect();
        this.onStart = onStart;
        this.queryId = queryId;
    }

    public void onMessage(IndicatorValue msg) {
        if (!running.get()) {
            return;
        }
        if (msg == null || !this.queryId.equals(msg.sequenceId)) {
            return;
        }
        switch (msg.type) {
        case RequestSeq:
            /*和 queryId 保持相同 if (queryId.equals(msg.value)) {
                sequenceId = msg.sequenceId;
                return;
            }*/
            break;
        case Value:
            buffer.add(msg);
            if (itemHandler != null) {
                try {
                    itemHandler.accept(msg);
                } catch (Exception e) {
                    logger.error("exception dealing with query result item", e);
                }
            }
            break;
        case TaskStatus:
            IndicatorValue.STATUS status = IndicatorValue.STATUS.valueOf((String) msg.value);
            switch (status) {
            case END_ERROR:
                if (errorHandler != null) {
                    try {
                        errorHandler.accept(this);
                    } catch (Exception e) {
                        logger.error("exception dealing with error", e);
                    }
                }
                dispose();
                break;
            case END_SUCCESS:
                if (successHandler != null) {
                    try {
                        successHandler.accept(buffer);
                    } catch (Exception e) {
                        logger.error("exception dealing with query result", e);

                    }
                }
                if (completeHandler != null) {
                    try {
                        completeHandler.accept(this);
                    } catch (Exception e) {
                        logger.error("exception dealing with query completion", e);

                    }
                }
                dispose();
                break;
            default:
            }
            break;
        case FailedMsg:
            @SuppressWarnings("unchecked")
            Map<String, List<String>> losers = (Map<String, List<String>>) msg.value;
            if (losers != null) {
                failedIndicators.putAll(losers);
                if (failedIndicatorsHandler != null) {
                    try {
                        failedIndicatorsHandler.accept(failedIndicators);
                    } catch (Exception e) {
                        logger.error("exception dealing with failed indicators", e);
                    }
                }
            }
            break;
        case ErrorMsg:
            break;
        default:
            break;
        }
        // get the sequence Id for the query

        // if (sequenceId == null || !sequenceId.equals(msg.sequenceId)) {
        // return;
        // }
    }

    public QueryHandle start(long timeoutMillis) {
        if (running.getAndSet(true)) {
            logger.warn("already started. queryId {}", queryId);
            return this;
        }
        timeoutFuture = scheduler.schedule(() -> {
            if (timeoutHandler != null) {
                try {
                    timeoutHandler.accept(this);
                } catch (Exception e) {
                }
            }
            if (valueSubscription != null) {
                valueSubscription.dispose();
                valueSubscription = null;
            }
            if (errorHandler != null) {
                errorHandler.accept(this);
            }

        }, timeoutMillis, TimeUnit.MILLISECONDS);
        onStart.accept(this);
        return this;

    }

    public QueryHandle onSuccess(Consumer<List<IndicatorValue>> action) {
        successHandler = action;
        return this;
    }

    public QueryHandle onTimeout(Consumer<QueryHandle> timeoutHandler) {
        timeoutHandler = timeoutHandler;
        return this;
    }

    public QueryHandle onError(Consumer<QueryHandle> action) {
        errorHandler = action;
        return this;
    }

    public QueryHandle onItem(Consumer<IndicatorValue> action) {
        itemHandler = action;
        return this;
    }

    public QueryHandle onFailedIndicators(Consumer<Map<String, List<String>>> action) {
        failedIndicatorsHandler = action;
        return this;
    }

    public QueryHandle onComplete(Consumer<QueryHandle> action) {
        completeHandler = action;
        return this;
    }

    public String getQueryId() {
        return queryId;
    }

    public void dispose() {
        running.set(false);
        if (null != timeoutFuture) {
            timeoutFuture.cancel(true);
            timeoutFuture = null;
        }
        if (null != valueSubscription) {
            valueSubscription.dispose();
            valueSubscription = null;
        }
    }

    /**
     * get all received query results
     * @return a list of results
     */
    public List<IndicatorValue> getResults() {
        return buffer;
    }

    /**
     * get failed indicators
     * @return 
     */
    public Map<String, List<String>> getFailedIndicators() {
        return failedIndicators;
    }

    /*public String getSequenceId() {
        return sequenceId;
    }
    
    public void setSequenceId(String sequenceId) {
        this.sequenceId = sequenceId;
    }*/

}
